In [1]:
import os
import sys

spark_path = "/Users/flavio.clesio/Documents/spark-2.1.0" 

os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path

sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip") # Must be the same version of your Spark Version

In [2]:
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
conf = (SparkConf()
 .setMaster("local")
 .setAppName("My app")
 .set("spark.executor.memory", "4g"))

In [4]:
sc = SparkContext(conf = conf)

In [ ]:
sc

In [5]:
ROOT_PATH = ('/Users/flavio.clesio/Downloads/ml-100k')

In [6]:
movielens = sc.textFile(ROOT_PATH + "/u.data")

Fonte

u.data -- The full u data set, 100000 ratings by 943 users on 1682 items. Each user has rated at least 20 movies. Users and items are numbered consecutively from 1. The data is randomly ordered. This is a tab separated list of user id | item id | rating | timestamp.


In [7]:
movielens.first()


Out[7]:
u'196\t242\t3\t881250949'

In [8]:
movielens.count()


Out[8]:
100000

In [9]:
#Clean up the data by splitting it
#Movielens readme says the data is split by tabs and
#is user product rating timestamp
clean_data = movielens.map(lambda x:x.split('\t'))

In [10]:
clean_data.take(10)


Out[10]:
[[u'196', u'242', u'3', u'881250949'],
 [u'186', u'302', u'3', u'891717742'],
 [u'22', u'377', u'1', u'878887116'],
 [u'244', u'51', u'2', u'880606923'],
 [u'166', u'346', u'1', u'886397596'],
 [u'298', u'474', u'4', u'884182806'],
 [u'115', u'265', u'2', u'881171488'],
 [u'253', u'465', u'5', u'891628467'],
 [u'305', u'451', u'3', u'886324817'],
 [u'6', u'86', u'3', u'883603013']]

In [11]:
#As an example, extract just the ratings to its own RDD
#rate.first() is 3
rate = clean_data.map(lambda y: int(y[2]))

In [12]:
rate.mean() #Avg rating is 3.52986


Out[12]:
3.529859999999947

In [13]:
#Extract just the users
users = clean_data.map(lambda y: int(y[0]))

In [14]:
users.distinct().count() #943 users


Out[14]:
943

In [15]:
#You don't have to extract data to its own RDD
#This command counts the distinct movies
#There are 1,682 movies
clean_data.map(lambda y: int(y[1])).distinct().count()


Out[15]:
1682

In [16]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import MatrixFactorizationModel
from pyspark.mllib.recommendation import Rating

In [17]:
#We'll need to map the movielens data to a Ratings object 
#A Ratings object is made up of (user, item, rating)
mls = movielens.map(lambda l: l.split('\t'))

In [18]:
ratings = mls.map(lambda x: Rating(int(x[0]),int(x[1]), float(x[2])))

In [19]:
#Need a training and test set
train, test = ratings.randomSplit([0.8,0.2],7856)

In [20]:
print 'The number of traning instances is:', train.count()


The number of traning instances is: 79909

In [21]:
print 'The number of traning instances is:', test.count()


The number of traning instances is: 20091

In [22]:
#Need to cache the data to speed up training
train.cache()


Out[22]:
PythonRDD[18] at RDD at PythonRDD.scala:48

In [23]:
test.cache()


Out[23]:
PythonRDD[19] at RDD at PythonRDD.scala:48

In [24]:
#Setting up the parameters for ALS
rank = 5           # Latent Factors to be made
numIterations = 10 # Times to repeat process

In [25]:
#Create the model on the training data
model = ALS.train(train, rank, numIterations)

In [26]:
#Examine the latent features for one product
model.productFeatures().first()


Out[26]:
(1,
 array('d', [0.18627141416072845, 0.35220956802368164, 0.4609048068523407, -2.304429054260254, -0.06592054665088654]))

In [27]:
#Examine the latent features for one user
model.userFeatures().first()


Out[27]:
(1,
 array('d', [0.23921744525432587, 0.4624086618423462, 1.2235586643218994, -1.3425638675689697, 0.19017109274864197]))

In [28]:
# For Product X, Find N Users to Sell To
model.recommendUsers(242,100)


Out[28]:
[Rating(user=169, product=242, rating=5.906993115535968),
 Rating(user=97, product=242, rating=5.8877262624830475),
 Rating(user=443, product=242, rating=5.615114079579772),
 Rating(user=895, product=242, rating=5.490619320476549),
 Rating(user=353, product=242, rating=5.388828767130553),
 Rating(user=98, product=242, rating=5.357527553816799),
 Rating(user=4, product=242, rating=5.271224238730753),
 Rating(user=34, product=242, rating=5.237695136852757),
 Rating(user=583, product=242, rating=5.20319254876104),
 Rating(user=511, product=242, rating=5.176521175388039),
 Rating(user=22, product=242, rating=5.173710902830484),
 Rating(user=270, product=242, rating=5.172565262002639),
 Rating(user=165, product=242, rating=5.162151298220143),
 Rating(user=770, product=242, rating=5.157765735022165),
 Rating(user=240, product=242, rating=5.1574108425414895),
 Rating(user=888, product=242, rating=5.151970502353445),
 Rating(user=180, product=242, rating=5.1426624031315376),
 Rating(user=842, product=242, rating=5.1242507031643925),
 Rating(user=520, product=242, rating=5.111843538107488),
 Rating(user=697, product=242, rating=5.111600314005836),
 Rating(user=273, product=242, rating=5.098623141637639),
 Rating(user=310, product=242, rating=5.098238965784004),
 Rating(user=274, product=242, rating=5.091307645140562),
 Rating(user=941, product=242, rating=5.06355609308808),
 Rating(user=46, product=242, rating=5.058602351261012),
 Rating(user=928, product=242, rating=5.029388348310919),
 Rating(user=803, product=242, rating=5.027094842363262),
 Rating(user=531, product=242, rating=5.018689299850635),
 Rating(user=9, product=242, rating=5.004466553551705),
 Rating(user=153, product=242, rating=5.001260630177852),
 Rating(user=341, product=242, rating=4.9959172490607),
 Rating(user=355, product=242, rating=4.987396570370837),
 Rating(user=923, product=242, rating=4.970419276835539),
 Rating(user=182, product=242, rating=4.96336815036501),
 Rating(user=613, product=242, rating=4.934249701362775),
 Rating(user=414, product=242, rating=4.931197720515597),
 Rating(user=367, product=242, rating=4.908425924740504),
 Rating(user=696, product=242, rating=4.901930000402363),
 Rating(user=753, product=242, rating=4.896993563369895),
 Rating(user=849, product=242, rating=4.894606861047926),
 Rating(user=732, product=242, rating=4.8863510787536875),
 Rating(user=122, product=242, rating=4.882038861712724),
 Rating(user=295, product=242, rating=4.878154999532819),
 Rating(user=78, product=242, rating=4.877135765344423),
 Rating(user=711, product=242, rating=4.876739808776248),
 Rating(user=174, product=242, rating=4.8718177493531485),
 Rating(user=173, product=242, rating=4.8703707596854935),
 Rating(user=249, product=242, rating=4.8671894610304065),
 Rating(user=691, product=242, rating=4.855751860127286),
 Rating(user=611, product=242, rating=4.841037864503663),
 Rating(user=324, product=242, rating=4.839108344636913),
 Rating(user=415, product=242, rating=4.832446094016953),
 Rating(user=75, product=242, rating=4.829112269481126),
 Rating(user=212, product=242, rating=4.822135688247172),
 Rating(user=701, product=242, rating=4.815608758287317),
 Rating(user=776, product=242, rating=4.8137465615342965),
 Rating(user=472, product=242, rating=4.8102741015527535),
 Rating(user=517, product=242, rating=4.795911845466298),
 Rating(user=686, product=242, rating=4.79333894578126),
 Rating(user=808, product=242, rating=4.785857213758975),
 Rating(user=420, product=242, rating=4.773990477318325),
 Rating(user=675, product=242, rating=4.772299112157708),
 Rating(user=108, product=242, rating=4.771353530214936),
 Rating(user=131, product=242, rating=4.7703052188464525),
 Rating(user=526, product=242, rating=4.767511984475665),
 Rating(user=343, product=242, rating=4.76661650178143),
 Rating(user=339, product=242, rating=4.751910369202424),
 Rating(user=469, product=242, rating=4.744817839188438),
 Rating(user=641, product=242, rating=4.73782674370646),
 Rating(user=558, product=242, rating=4.735074426325124),
 Rating(user=118, product=242, rating=4.728877525374468),
 Rating(user=572, product=242, rating=4.7281876675497445),
 Rating(user=742, product=242, rating=4.717780738294106),
 Rating(user=810, product=242, rating=4.711981341579477),
 Rating(user=423, product=242, rating=4.709937336548089),
 Rating(user=794, product=242, rating=4.706235224329133),
 Rating(user=244, product=242, rating=4.7054675055004935),
 Rating(user=257, product=242, rating=4.7026982808103),
 Rating(user=555, product=242, rating=4.69908475392647),
 Rating(user=252, product=242, rating=4.6962762801970355),
 Rating(user=592, product=242, rating=4.692136139382635),
 Rating(user=867, product=242, rating=4.692049871803102),
 Rating(user=94, product=242, rating=4.690813140730307),
 Rating(user=508, product=242, rating=4.690474178345996),
 Rating(user=157, product=242, rating=4.690191973718131),
 Rating(user=315, product=242, rating=4.687048390458035),
 Rating(user=136, product=242, rating=4.682941627458381),
 Rating(user=12, product=242, rating=4.680569321853612),
 Rating(user=267, product=242, rating=4.676972546908312),
 Rating(user=21, product=242, rating=4.668809793268874),
 Rating(user=765, product=242, rating=4.661230187716204),
 Rating(user=103, product=242, rating=4.657294182082613),
 Rating(user=580, product=242, rating=4.65641345548333),
 Rating(user=233, product=242, rating=4.654174905025756),
 Rating(user=30, product=242, rating=4.652469368095604),
 Rating(user=565, product=242, rating=4.645448618178349),
 Rating(user=147, product=242, rating=4.644322340456501),
 Rating(user=891, product=242, rating=4.642683246718532),
 Rating(user=725, product=242, rating=4.639886486943968),
 Rating(user=523, product=242, rating=4.636381716452316)]

In [29]:
# For User Y Find N Products to Promote
model.recommendProducts(196,10)


Out[29]:
[Rating(user=196, product=593, rating=6.984630282120642),
 Rating(user=196, product=1664, rating=6.325721017278127),
 Rating(user=196, product=361, rating=5.85662119959493),
 Rating(user=196, product=867, rating=5.751188326142186),
 Rating(user=196, product=1426, rating=5.739967376648849),
 Rating(user=196, product=1207, rating=5.650164178615768),
 Rating(user=196, product=1166, rating=5.444729524362961),
 Rating(user=196, product=1155, rating=5.44236415401739),
 Rating(user=196, product=1643, rating=5.363915512556903),
 Rating(user=196, product=1594, rating=5.271297493439602)]

In [30]:
#Predict Single Product for Single User
model.predict(196, 242)


Out[30]:
3.8662024781956785

In [31]:
# Predict Multi Users and Multi Products
# Pre-Processing
pred_input = train.map(lambda x:(x[0],x[1]))

In [32]:
# Lots of Predictions
#Returns Ratings(user, item, prediction)
pred = model.predictAll(pred_input)

In [33]:
#Get Performance Estimate
#Organize the data to make (user, product) the key)
true_reorg = train.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))

In [34]:
#Do the actual join
true_pred = true_reorg.join(pred_reorg)

In [35]:
#Need to be able to square root the Mean-Squared Error
from math import sqrt

In [36]:
MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)#Results in 0.7629908117414474

In [37]:
#Test Set Evaluation
#More dense, but nothing we haven't done before
test_input = test.map(lambda x:(x[0],x[1])) 
pred_test = model.predictAll(test_input)
test_reorg = test.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred_test.map(lambda x:((x[0],x[1]), x[2]))
test_pred = test_reorg.join(pred_reorg)
test_MSE = test_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
test_RMSE = sqrt(test_MSE)#1.0145549956596238

In [38]:
#If you're happy, save your model!
#model.save(sc,ROOT_PATH + "/ml-model")
#sameModel = MatrixFactorizationModel.load(sc, ROOT_PATH + "/ml-model)

In [43]:
RMSE


Out[43]:
0.772560452603972